本次源码分析是以 GitHub 上 RxJava 仓库 2.1.1版本来进行的分析,同时以官方中文文档作为参考。
上一篇中,我们分析对比了同步和异步情况下,背压的原理,并针对各种情况,分析了不同的解决办法,总而言之一句话就是“不让上游的漏斗中的油流出来”。好了说了这么多,我们这篇来分享几个 Android 开发中的常见用法。这次对于过于简单的例子,我们就不详讲了,如线程切换用法(毕竟这是Rxjava的基础功能,我们还是在Rx上做文章较多).
对于集合的遍历操作
由于 Android 低版本(API 24 以下)不支持 Java8 的流式操作,因此我们可以使用 Rxjava 将其转化为支持流式操作。
List<String> list = Arrays.asList("Arirus","is","me","Welcome","to","my","blog");
String pattern="^[A-Z]+\\w{6}";
for (String content: list) {
if (Pattern.matches(pattern,content)) Log.i(ARIRUS, "onCreate: "+content);
}
Observable.fromIterable(list).filter(content->Pattern.matches(pattern,content)).blockingForEach(
s -> Log.i(ARIRUS, "accept: "+s));
list.stream()
.filter(content -> Pattern.matches(pattern, content))
.forEach(content -> Log.i(ARIRUS, "accept: " + content));
这里给出三个对比:
第一个是常规写法,对集合进行遍历,对于每一个遍历的对象进行判断进而进行响应的操作;第二个是使用 Rxjava 转换后,从一个集合中生成一个 Observable,将一个个的集合元素封装成元素发射到下游,同时进行过滤和 forEach 操作。最后注意要 blocking 一下转换成阻塞操作;第三个则是Java8支持的Stream操作,基本原理同上,字面意思更好理解。
因此,再不支持Java8的情况下,使用 Observable 转换来支持流操作是个不错的选择。
与 UI 结合使用
我觉得使用 RX 来进行 UI 操作是一种全新的体验。现在我们考虑一种情景:响应点击 Button 的点击事件,同时要求每秒内只能响应第一次,也就是我们常说的消抖功能,使用 RX 我们可以这么写:
private static class Ding {
Date time;
public Ding() {
time = new Date(System.currentTimeMillis());
}
}
Flowable.just(button)
.flatMap((Function<Button, Publisher<Ding>>) button1 ->
Flowable.create(emitter -> button1.setOnClickListener(v -> emitter.onNext(new Ding())),
BackpressureStrategy.ERROR))
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(ding -> Log.i(ARIRUS, "onCreate: " + ding.time));
//这里我们使用了,Flowable,当然也可以使用 Observable
这里开始对一个 Button 进行观察,通过 flatMap 使用将一个 Button 流,转换成了Button的点击事件流,同时对于流进行了限制,每秒只能有1个点击事件传输到下游。一个链式的调用很舒服。
再另举一个例子:登录界面,要求两个 EditText 都填上了相应的内容才可以点击登录按钮。如果使用 RX 我们可以这么写:
Flowable<CharSequence> flowable1 = Flowable.just(ed1)
.flatMap(editText -> Flowable.create((FlowableOnSubscribe<CharSequence>) emitter -> {
editText.addTextChangedListener(new TextWatcher() {
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
emitter.onNext(s);
}
...
});
emitter.onNext("");
}, BackpressureStrategy.ERROR));
Flowable<CharSequence> flowable2 = Flowable.just(ed2)
.flatMap(editText -> Flowable.create((FlowableOnSubscribe<CharSequence>) emitter -> {
editText.addTextChangedListener(new TextWatcher() {
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
emitter.onNext(s);
}
...
});
emitter.onNext("");
}, BackpressureStrategy.ERROR));
Flowable.combineLatest(flowable1, flowable2,
(charSequence, charSequence2) -> charSequence.length()> 0
&& charSequence.length() < 20
&& charSequence2.length() > 0
&& charSequence2.length() < 10).subscribe(button::setEnabled);
这里面简略了 TextWatcher 的方法,思路还是同上, 将每个 EditText 的观察转换为其“内容变化”的观察,同时对于观察的数据进行判断,符合规定的话,才会做响应的操作。 类似的操作还有很多,这里就不一一举例了,可以使用 RxBinding 库来进行更为细致的操作。
操作数据源
通常对于某些情况,我们的数据源来源不是一个。例如,淘宝App,首页资讯在断网打开的情况下,依然会显示,因为其在本地对于图片等信息有缓存,因此有时会看到界面一闪的刷新一下,这就是来网络数据来的对界面进行了刷新。
我们可以使用 merge 方法来将不同的数据源进行合并。
Observable<String> observable1 =
Observable.interval(1, TimeUnit.SECONDS).take(5).map(aLong -> "资源1:"+aLong);
Observable<String> observable2 =
Observable.interval(2,1, TimeUnit.SECONDS).take(5).map(aLong -> "资源2:"+aLong);
Observable<String> observable3 =
Observable.interval(4,1, TimeUnit.SECONDS).take(5).map(aLong -> "资源3:"+aLong);
//将不同源的数据合并并打印出来
Observable.merge(observable1, observable2, observable3)
.subscribe(string -> Log.i(ARIRUS, "subscribe: " + string));
Log:
I/ARIRUS: subscribe: 资源1:0
I/ARIRUS: subscribe: 资源1:1
I/ARIRUS: subscribe: 资源2:0
I/ARIRUS: subscribe: 资源1:2
I/ARIRUS: subscribe: 资源2:1
I/ARIRUS: subscribe: 资源1:3
I/ARIRUS: subscribe: 资源2:2
I/ARIRUS: subscribe: 资源3:0
I/ARIRUS: subscribe: 资源1:4
I/ARIRUS: subscribe: 资源2:3
I/ARIRUS: subscribe: 资源3:1
I/ARIRUS: subscribe: 资源2:4
I/ARIRUS: subscribe: 资源3:2
I/ARIRUS: subscribe: 资源3:3
I/ARIRUS: subscribe: 资源3:4
// 这里便将三个不同的数据源数据合并起来了。并将三个数据源的数据打印出来。
//对于某些情况,我们想要默认显示源1的数据,如果源2的数据来了,则显示后者的,可以结合 takeUntil 一起使用。这种情况,在某些咨询类页面较为常见。
Observable.merge(observable1.takeUntil(observable2), observable2.takeUntil(observable3), observable3)
.subscribe(string -> Log.i(ARIRUS, "subscribe: " + string));
Log:
I/ARIRUS: subscribe: 资源1:0
I/ARIRUS: subscribe: 资源2:0
I/ARIRUS: subscribe: 资源2:1
I/ARIRUS: subscribe: 资源3:0
I/ARIRUS: subscribe: 资源3:1
I/ARIRUS: subscribe: 资源3:2
I/ARIRUS: subscribe: 资源3:3
I/ARIRUS: subscribe: 资源3:4
//这样在第一个发射的过程中,如果发现源2开始发射数据,则前者停止发射,同理源3
//当然也可以结合 take() 来取钱几个内容,起到一个优先级的作用,这里就不详细讲了
与 RecyclerView 的结合使用
RecyclerView 应该是最常用的控件之一,应为手机设备的特性,所以经常使用 RecyclerView 来进行浏览信息。使用 RecyclerView 有两个需要注意的地方:
1.如何添加数据;
2.如何响应 itemCell 的点击事件
由于通常显示数据的时候,不会一下显示完,而是一部分一部分的添加显示(微博),同时 RecyclerView 没有itemCell的点击事件监听。因此 RecyclerView 使用的时候通常需要二次封装。那么常规的解决方式应该是:
1.监听 RecyclerView 滑动事件;
2.设置 Adapter 的 ClickListener
如果这里我们将 Rxjava 与 RecyclerView 进行结合,那么可以更清晰的方式来解决上述问题。
//调用处进行响应的订阅,用来接收通知事件等。
mCustomAdapter.clickEvent.subscribe(integer -> Toast.makeText(this, "点击item:"+integer, Toast.LENGTH_SHORT)
.show());
mCustomAdapter.updateEvent.subscribe(updateEvent -> mPresenter.update(mPage++));
private static class CustomAdapter extends RecyclerView.Adapter<ItemView> {
private List<Integer> mList;
private PublishProcessor<Integer> mProcessorClick; //接收或者发射 点击 itemCell的 position
private PublishProcessor<UpdateEvent> mProcessorUpdate; //接收或者发射 更新 list的通知
public Flowable<Integer> clickEvent ; //暴露对象
public Flowable<UpdateEvent> updateEvent; //暴露对象
boolean isLoading; //更新标志位
public void setLoading(boolean loading) {
isLoading = loading;
}
public CustomAdapter() {
mList =new ArrayList<>();
mProcessorClick = PublishProcessor.create();
mProcessorUpdate : PublishProcessor.create();
clickEvent = mProcessorClick;
updateEvent = mProcessorUpdate;
}
@Override
public ItemView onCreateViewHolder(ViewGroup parent, int viewType) {
View view = LayoutInflater.from(parent.getContext())
.inflate(android.R.layout.simple_list_item_activated_1, parent, false);
return new ItemView(view);
}
@Override
public void onBindViewHolder(ItemView holder, int position) {
holder.bind(mList.get(position) , mProcessorClick);
if (position+2>=getItemCount() && !isLoading){
mProcessorUpdate.onNext(UpdateEvent.INSTANCE);
isLoading = true;
}
}
@Override
public int getItemCount() {
return mList.size();
}
enum UpdateEvent {
INSTANCE
}
}
private static class ItemView extends RecyclerView.ViewHolder {
private final TextView mTextView;
public ItemView(View itemView) {
super(itemView);
mTextView = itemView.findViewById(android.R.id.text1);
}
public void bind(Integer integer,PublishProcessor<Integer> processor) {
mTextView.setText(String.valueOf(integer));
mTextView.setOnClickListener(v-> processor.onNext(integer));
}
}
PublishProcessor 在之前我们说过,其类似与 PublishSubject 既可以接收事件也可以发送事件,这里我们便利用了这一特性。当 itemCell 被点击的时候,或者当 RecyclerView 需要添加新的数据的时候,便会向 PublishProcessor 发射通知,而在外部订阅出会收到通知,从而进行响应的操作。这样将所有绑定操作都分配到 Adapter 和 itemCell 的内部,外部只需要暴露出一个 Observable,外部订阅后便会都到通知。
RxBus 初探
事件总线等相关概念我们在这里不再重复强调,这里只是来介绍 RxBus ,看看后期有没有可能写一篇 RxBus 与 EventBus 对比的文章了(挖坑)。 这里我们给 RxBus 提几个要求:
1.满足事件发布,订阅的需求
2.针对同类型事件,支持已经不同标志位进行订阅
3.支持 Sticky 事件
针对第一点,上面的 RecyclerView 其实已经为我们指出使用方法了,使用 PublishProcessor ,可以在不同地方进行发布和订阅。注意由于 RxBus 可以在多线程中使用因此,使用 FlowableProcessor 更好一些,因为后者是线程安全的,可以保证事件前后发送和接收顺序一致性。第二点,我们需要对同一类型的事件,进行区分,因此在发布时我们需要加上一个 TAG 标志位,同时根据 TAG 进行接收时的过滤。第三点,由于 PublishProcessor 的特性(不会保留之前的事件,只会发送订阅之后的发布的事件),因此另需要一个 HashMap 来存储(BehaviorProcessor 也不行,因为我们是将所有的事件都发送到同一个 Processor 中,因此无法保证前一个和后一个事件是同一类型的)。因此我们可以向如下这样写。
public enum RxBus {
INSTANCE;
FlowableProcessor<Object> mFlowableProcessor;
ConcurrentHashMap<Object, CompositeDisposable> mDisposableMap;
ConcurrentHashMap<Class, Event> mStickyMap;
RxBus() {
mFlowableProcessor = PublishProcessor.create().toSerialized();
}
public void post(Object o, String tag, boolean isSticky) {
Event event = new Event(o, tag);
mFlowableProcessor.onNext(event);
if (isSticky) addStickyMap(event);
}
public <T> Disposable doSubscribe(Class<T> type, String tag,boolean isSticky, Consumer<T> consumer) {
Flowable<Event> flowable = mFlowableProcessor.ofType(Event.class);
if (isSticky && mStickyMap.get(type)!=null)
return Flowable.merge(Flowable.just(mStickyMap.get(type)),flowable)
.filter(event -> event.mObject.getClass() == type && tag.equals(event.mTag))
.map(event -> (T)event.mObject).subscribe(o->consumer.accept((T)o));
return flowable
.filter(event -> event.mObject.getClass() == type && tag.equals(event.mTag))
.map(event -> (T)event.mObject)
.subscribe(o -> consumer.accept(o));
}
...
private static class Event {
Object mObject;
String mTag;
public Event(Object object, String tag) {
mObject = object;
mTag = tag;
}
}}
}
这里简写了部分内容,只保留了发布和订阅部分。逻辑其实和上面的 RecyclerView 封装是一样的,使用一个 FlowableProcessor 来发布和订阅事件,同时使用一个 HashMap 保存 Sticky 事件实例,订阅时,如果此事件实例存在就和 FlowableProcessor 源一起订阅,否则只订阅后者,当然了这里我对于事件匹配的判读都是最直接方式进行的书写,没有进行封装。调用则可以采用如下方式调用
RxBus.INSTANCE.post("Click Post", "Click", false); //发送事件
RxBus.INSTANCE.doSubscribe(String.class, "Click", true, s -> textView.setText(s)) //接收事件
小结
本篇中,我们把 Android 的相关操作和 Rxjava 进行了简单的结合。其实站在 Rxjava 的角度,就只有两类,一类是冷 Observable,另一类是热 Observable ,针对不同的情况灵活使用使得代码更为灵活。